Amazon ConnectからKinesis Video Streamに送信する音声データは、GetMediaForFragmentList APIで取得すべき理由
はじめに
以前、Amazon Connectでエージェントが介在しない「留守番電話」や「AIチャットボット」で録音したい場合、Kinesis Video Streams(以降、KVS)経由でAWS Lambdaを使い音声データの録音と保存する方法をまとめました。
ただし、ConnectからKVS経由で音声ファイルS3バケットに保存すると、録音した音声と異なる音声が時折含まれていました。
調査した結果、以下の記事でも同じ現象に言及しておりました。その記事では、KVSからメディアデータを取得する際にGetMedia
APIではなくGetMediaForFragmentList
APIを利用することで、この問題が解消されたと書かれていました。
今回の記事では、上記のGetMediaForFragmentList
を利用することで問題が解消された理由を解説し、先述の記事Node.jsで書かれていたため、Pythonのコードでの実装内容を明記します。
KVSに出てくる用語
下記の用語は、KVSで必須ですのでまとめました。
- メディアデータ
- 音声、動画、画像といった形式により人間が判断できる情報のことです。ビデオ、オーディオ、および関連メタデータから構成されます。
- チャンク
- チャンクは大きなメディアデータを取り扱いやすいサイズに分割した部分データです。
- 各チャンクはメディアデータ全体の一部を表し、独立してストリームに送信、処理されます。
- ストリーム内でのデータの格納形式であり、以下を含みます。
- メディアメタデータ
- フラグメント
- KVSのメタデータ(フラグメント番号、サーバー側タイムスタンプ、プロデューサー側タイムスタンプ)
- フラグメント
- 短時間のフレームをまとめたもので、固有の番号が割り当てられます。
- フレーム
- ビデオフレームは1つの静止画像のことで、連続して再生することで動画となります。音声フレームは、ある瞬間の音声データのことです。
- フレームはさまざまなメディアデータを組み合わせてメディアストリームを形成する基本要素です。
- プロデューサー
- KVSに送信する側(今回の場合、Connectに該当)
- コンシューマー
- KVSから受信する側(今回の場合、Lambdaに該当)
- KVSからチャンク単位で取得し、チャンクからフラグメントを取り出します。
下記にある通り、プロデューサー(Connectなど)からKVSにフラグメントを送信し、KVSからコンシューマー(Lambdaなど)はチャンクを取得します。
KVSからコンシューマーがチャンクを取得する場合、GetMedia
もしくはGetMediaForFragmentList
のAPIを利用する必要があります。APIの違いは次の章で解説します。
録音した音声とは別の音声原因
GetMedia
を利用しメディアデータ取得する場合、KVSのARNと取得する開始チャンクのフラグメント番号を指定し、開始チャンクから全てのフラグメントを取得します。
GetMedia
では、終了位置を指定することができないため、プロデューサーから送信された別の音声も取得してしまい、録音した音声とは別の音声が入る現象が発生したと考えられます。
ではどのように取得するとよいかと言いますと、ListFragments
とGetMediaForFragmentList
のAPIを利用します。
ListFragments
は、開始と終了時のタイムスタンプ(フラグメント番号も可)で指定し、指定範囲内のフラグメントのリストを取得するAPIです。
GetMediaForFragmentList
は、KVSに保存されたアーカイブデータから指定したフラグメントのメディアデータを取得するAPIです。フラグメント番号のリストを指定してリクエストします。
つまり、今回のケースでは、Connectから送信された必要なメディアデータのみを取得するには、ListFragments
で範囲内のフラグメントのリストを取得し、GetMediaForFragmentList
で範囲内のメディアデータのみを取得することで実現できます。
AWS Black Beltでは、GetMedia
APIはリアルタイム処理で利用し、バッチ処理では、ListFragments
とGetMediaForFragmentList
APIを利用するように記載がありました。
ConnectからKVSへの音声のストリーミングをする場合、バッチ処理でKVSから音声ファイルを取得します。
そのため、リアルタイム処理用途で利用されるGetMedia
APIを採用することがそもそもの間違えのようですね。
検証環境の構築
下記の記事では、ConnectからKVSに送信した音声データをLambdaで取得し、S3バケットに保存する手順を記載しております。
Lambdaのコード以外は下記の記事の手順で環境を構築します。
Lambdaのコード
import boto3, io, struct,json from ebmlite import loadSchema from enum import Enum from datetime import datetime, timedelta from botocore.config import Config JST_OFFSET = timedelta(hours=9) class Mkv(Enum): SEGMENT = 0x18538067 CLUSTER = 0x1F43B675 SIMPLEBLOCK = 0xA3 class Ebml(Enum): EBML = 0x1A45DFA3 class KVSParser: def __init__(self, media_content): self.__stream = media_content["Payload"] self.__schema = loadSchema("matroska.xml") self.__buffer = bytearray() @property def fragments(self): return [fragment for chunk in self.__stream if (fragment := self.__parse(chunk))] def __parse(self, chunk): self.__buffer.extend(chunk) header_elements = [e for e in self.__schema.loads(self.__buffer) if e.id == Ebml.EBML.value] if header_elements: fragment_dom = self.__schema.loads(self.__buffer[:header_elements[0].offset]) self.__buffer = self.__buffer[header_elements[0].offset:] return fragment_dom def get_simple_blocks(media_content): parser = KVSParser(media_content) return [ b.value for document in parser.fragments for b in next(filter(lambda c: c.id == Mkv.CLUSTER.value, next(filter(lambda s: s.id == Mkv.SEGMENT.value, document)))) if b.id == Mkv.SIMPLEBLOCK.value ] def create_audio_sample(simple_blocks, margin=4): total_length = sum(len(block) - margin for block in simple_blocks) combined_samples = bytearray(total_length) position = 0 for block in simple_blocks: temp = block[margin:] combined_samples[position:position+len(temp)] = temp position += len(temp) return combined_samples def convert_bytearray_to_wav(samples): length = len(samples) channel = 1 bit_par_sample = 16 format_code = 1 sample_rate = 8000 header_size = 44 wav = bytearray(header_size + length) wav[0:4] = b"RIFF" wav[4:8] = struct.pack("<I", 36 + length) wav[8:12] = b"WAVE" wav[12:16] = b"fmt " wav[16:20] = struct.pack("<I", 16) wav[20:22] = struct.pack("<H", format_code) wav[22:24] = struct.pack("<H", channel) wav[24:28] = struct.pack("<I", sample_rate) wav[28:32] = struct.pack("<I", sample_rate * channel * bit_par_sample // 8) wav[32:34] = struct.pack("<H", channel * bit_par_sample // 8) wav[34:36] = struct.pack("<H", bit_par_sample) wav[36:40] = b"data" wav[40:44] = struct.pack("<I", length) wav[44:] = samples return wav def create_archive_media_client(ep): region_name = "ap-northeast-1" return boto3.client("kinesis-video-archived-media", endpoint_url=ep, config=Config(region_name=region_name)) def upload_audio_to_s3(bucket_name, audio_data, filename): s3_client = boto3.client("s3") s3_client.upload_fileobj(io.BytesIO(audio_data), bucket_name, filename, ExtraArgs={"ContentType": "audio/wav"}) def get_media_data(arn, start_timestamp, end_timestamp): kvs_client = boto3.client("kinesisvideo") list_frags_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="LIST_FRAGMENTS")["DataEndpoint"] list_frags_client = create_archive_media_client(list_frags_ep) fragment_list = list_frags_client.list_fragments( StreamARN=arn, FragmentSelector={ "FragmentSelectorType": "PRODUCER_TIMESTAMP", "TimestampRange": {"StartTimestamp": start_timestamp, "EndTimestamp": end_timestamp} } ) sorted_fragments = sorted(fragment_list["Fragments"], key=lambda fragment: fragment["ProducerTimestamp"]) fragment_number_array = [fragment["FragmentNumber"] for fragment in sorted_fragments] get_media_ep = kvs_client.get_data_endpoint(StreamARN=arn, APIName="GET_MEDIA_FOR_FRAGMENT_LIST")["DataEndpoint"] get_media_client = create_archive_media_client(get_media_ep) media = get_media_client.get_media_for_fragment_list(StreamARN=arn, Fragments=fragment_number_array) return media def transcribe_audio(bucket_name, filename, job_name, vocabulary_name): transcribe_client = boto3.client("transcribe") transcribe_client.start_transcription_job( TranscriptionJobName=job_name, Media={'MediaFileUri': f"s3://{bucket_name}/{filename}"}, MediaFormat='wav', LanguageCode='ja-JP', Settings={'VocabularyName': vocabulary_name} ) return job_name def convert_ms_to_datetime(timestamp_ms_str, add_seconds=1): # 音声の尻切れのため、add_seconds timestamp_seconds = float(timestamp_ms_str) / 1000 + add_seconds return datetime.utcfromtimestamp(timestamp_seconds) def lambda_handler(event, context): print('Received event:' + json.dumps(event, ensure_ascii=False)) media_streams = event["Details"]["ContactData"]["MediaStreams"]["Customer"]["Audio"] stream_arn = media_streams["StreamARN"] start_timestamp = convert_ms_to_datetime(media_streams["StartTimestamp"]) end_timestamp = convert_ms_to_datetime(media_streams["StopTimestamp"]) combined_samples = create_audio_sample( get_simple_blocks(get_media_data(stream_arn, start_timestamp, end_timestamp))) wav_audio = convert_bytearray_to_wav(combined_samples) bucket_name = "バケット名" jst_time = datetime.utcnow() + JST_OFFSET filename = f"output_{jst_time.strftime('%Y%m%d_%H%M%S')}.wav" upload_audio_to_s3(bucket_name, wav_audio, filename) return { "statusCode": 200, "body": "Audio uploaded successfully!" }
S3のバケット名は、各自変更下さい。
関数get_media_data
の処理の流れを解説します。
GetDataEndpoint
を利用- KVSのARNから、
ListFragments
APIの呼び出しに必要なエンドポイントを取得します
- KVSのARNから、
ListFragments
を利用- KVSのARNと開始と終了のタイムスタンプから、指定したタイムスタンプ範囲に該当するフラグメントのリストを取得します
- sortedを利用
ListFragments
で取得されるフラグメントは順不同なのでProducerTimestampで昇順に並び替えます- ProducerTimestamp は、KVSのフラグメントに関するメタデータの一部で、プロデューサーによってフラグメントが生成された時点のタイムスタンプを指します。 引用元
GetDataEndpoint
を利用- KVSのARNから、
GetMediaForFragmentList
APIの呼び出しに必要なエンドポイントを取得します
- KVSのARNから、
GetMediaForFragmentList
を利用- KVSのARNとフラグメントリストから、メディアデータを取得します
メディアデータから音声データを抽出し、WAV形式に変換するコードについては、先程の記事に解説を載せていますので、ご参考ください。
Lambdaの実行時間
ちなみに、Connectで10秒程度を録音した際、KVSからLambdaで音声データを取得し、WAV形式でS3にファイルをアップロードする処理時間は、GetMedia
APIでは7秒、GetMediaForFragmentList
APIでは4秒程度だったため、処理時間も短くなりました。
短くなることで、何がうれしいかというと、ConnectフローからLambdaを呼び出す際、最大8秒でタイムアウトになってしまいます。
GetMediaForFragmentList
を利用すると、4秒程度だったため、タイムアウトする可能性が低くなり、Lambdaからのレスポンス内容をフローで利用することができます。
最後に
今回の記事では、ConnectからKVSに送信したメディアデータから音声データを取得する場合、GetMediaForFragmentList
を利用すべき理由とPythonのコードについて解説しました。
GetMedia
APIはリアルタイム処理に適しており、一方、バッチ処理のケースではGetMediaForFragmentList
APIを利用することが適切だと分かりました。
この情報が参考になれば幸いです。